/**
 * Copyright [2020] [LiBo/Alex of copyright liboware@gmail.com ]
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.hyts.stream.engine.stream;

import com.hyts.stream.engine.model.WordEvent;
import com.hyts.stream.engine.source.CustomSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Random;

/**
 * @project-name:lhy-stream
 * @package-name:com.hyts.stream.engine.execute
 * @author:LiBo/Alex
 * @create-date:2022-05-13 21:21
 * @copyright:libo-alex4java
 * @email:liboware@gmail.com
 * @description:
 */
public class TimeScaleStreamExecutor {


    public static void main(String[] args) {


        try {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            DataStreamSource<WordEvent> inputDS = env.addSource(new CustomSource());

            SingleOutputStreamOperator<WordEvent> mapDS = inputDS.map(line -> {
                return new WordEvent(line.getWord(), line.getCount(), line.getTimestamp());
            }).assignTimestampsAndWatermarks(WatermarkStrategy.<WordEvent>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WordEvent>() {
                @Override
                public long extractTimestamp(WordEvent userBehavior, long l) {
                    return userBehavior.getTimestamp() * 1000L;
                }

            })).keyBy(WordEvent::getWord).
                window(TumblingProcessingTimeWindows.of(Time.seconds(5))).
                    aggregate(new AggregateFunction<WordEvent, Object, WordEvent>() {

                Random random = new Random();

                @Override
                public Object createAccumulator() {
                    return new WordEvent();
                }

                @Override
                public Object add(WordEvent value, Object accumulator) {
                    System.out.printf("param1:%s - param2%s \n", value, accumulator);
                    WordEvent param2 = (WordEvent) accumulator;
                    value.setCount(value.getCount() + param2.getCount());
                    if(random.nextInt(2) % 2 == 0){
                        value.setCount2(value.getCount());
                    }else{
                        value.setCount(value.getCount()+1);
                    }
                    return value;
                }

                @Override
                public WordEvent getResult(Object accumulator) {
//                    System.out.println("---result--"+accumulator);
                    return (WordEvent) accumulator;
                }

                @Override
                public Object merge(Object a, Object b) {
                    System.out.printf("-%s--merge--%s-\n", a, b);
                    return null;
                }
            }, new ProcessWindowFunction<WordEvent, WordEvent, String, TimeWindow>() {
                @Override
                public void process(String s, ProcessWindowFunction<WordEvent, WordEvent,
                                    String, TimeWindow>.Context context,
                                    Iterable<WordEvent> elements, Collector<WordEvent> out) throws Exception {
                    WordEvent event = elements.iterator().next();
                    System.out.printf("params:%s : data:%s\n",s,event);
                    out.collect(event);
                }
            }).keyBy(WordEvent::getWord).process(new KeyedProcessFunction<String, WordEvent, WordEvent>() {

                //定义一个状态，保存当前的总count值
                ValueState<WordEvent> totalCountState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    totalCountState = getRuntimeContext().getState(new ValueStateDescriptor<WordEvent>
                            ("total-count", WordEvent.class, new WordEvent()));
                }
                @Override
                public void processElement(WordEvent value,
                                           KeyedProcessFunction<String, WordEvent, WordEvent>.Context ctx,
                                           Collector<WordEvent> out) throws Exception {
                    WordEvent valueElement = totalCountState.value();
                    totalCountState.update(new WordEvent(value.getWord(),
                            value.getCount() + valueElement.getCount(),valueElement.getTimestamp()));
                    ctx.timerService().registerEventTimeTimer(valueElement.getTimestamp() + 1);
                }
                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<WordEvent> out) throws Exception {
                    //所有分组count值都到齐，直接输出当前的总count值
                    WordEvent totalCount = totalCountState.value();
                    out.collect(new WordEvent(ctx.getCurrentKey(),totalCount.getCount() ,totalCount.getCount2(), totalCount.getTimestamp()));
                    System.out.printf("--onTimer:%s---\n",totalCount);
                    totalCountState.clear();
                }
            });
            mapDS.addSink(new RichSinkFunction<WordEvent>() {
                @Override
                public void invoke(WordEvent value, Context context) throws Exception {
                    System.out.println("---sink----" + value);
                }
            });

            env.execute();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

}
